package cn.aijson.demo.streaming

import com.hankcs.hanlp.HanLP
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}

object SogouTopWindows {
  def main(args: Array[String]): Unit = {
    //0.准备环境
    val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))//每隔5s划分一个批次

    //1.加载数据
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost",9999)

    //2.处理数据
    val resultDS: DStream[(String, Int)] = lines.flatMap(line =>{
      import scala.collection.JavaConverters._ //将Java集合转为scala集合
      HanLP.segment(line).asScala.map(_.word) //ArrayBuffer(360, 安全卫士)
    })
      .map((_, 1))
      .reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(10))

    val sortedResultDS: DStream[(String, Int)] = resultDS.transform(rdd => {
      val sortRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
      val top3: Array[(String, Int)] = sortRDD.take(3)
      println("=======top3=====")
      top3.foreach(println)
      println("=======top3=====")
      sortRDD
    })


    //.输出结果
    sortedResultDS.print()//默认的输出
    //自定义输出
    sortedResultDS.foreachRDD((rdd1,time)=>{
      //查询结果取前5
      val rdd = sc.parallelize(rdd1.filter(t => !t._2.equals(".") && !t._2.equals("+")).sortBy(_._2,ascending = false).take(5))
      val milliseconds: Long = time.milliseconds
      println("------自定义输出---------")
      println("batchtime:"+milliseconds)
      println("------自定义输出---------")
      //最后使用自定义输出将结果输出到控制台/HDFS/MySQL
      //输出到控制台
      rdd.foreach(println)
      //输出到HDFS
      rdd.coalesce(1).saveAsTextFile("data/output/result-"+milliseconds)
      //输出到MySQL
      /*
        CREATE TABLE `t_hotwords` (
          `time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
          `word` varchar(255) NOT NULL,
          `count` int(11) DEFAULT NULL,
          PRIMARY KEY (`time`,`word`)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
       */
      rdd.foreachPartition(iter=>{
        //开启连接
        val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:8889/test?characterEncoding=UTF-8&useSSL=false","root","root")
        val sql:String = "INSERT INTO `t_hotwords` (`time`, `word`, `count`) VALUES (?, ?, ?);"
        val ps: PreparedStatement = conn.prepareStatement(sql)
        iter.foreach(t=>{
          val word: String = t._1
          val count: Int = t._2
          ps.setTimestamp(1,new Timestamp(milliseconds) )
          ps.setString(2,word)
          ps.setInt(3,count)
          ps.addBatch()
        })
        ps.executeBatch()
        //关闭连接
        if (conn != null) conn.close()
        if (ps != null) ps.close()
      })
    })

    //4.启动并等待结束
    ssc.start()
    ssc.awaitTermination()//注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来

    //5.关闭资源
    ssc.stop(stopSparkContext = true, stopGracefully = true)//优雅关闭
  }
}
